package org.example.kafka.commit;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.example.kafka.common.CommonConstant;
import org.example.kafka.common.KafkaConstants;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

/**
 * 特定提交
 */
public class CommitSpecial {
    public static void main(String[] args) {
        //消息消费者
        Properties properties = KafkaConstants.consumerConfig(
                "CommitSpecial",
                StringDeserializer.class,
                StringDeserializer.class);
        // 取消自动提交
        properties.put("enable.auto.commit",false);
        // 创建消费者
        KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(properties);
        Map<TopicPartition, OffsetAndMetadata> currOffsets = new HashMap<>();
        int count = 0;
        try{
            // 订阅主题
            consumer.subscribe(Collections.singletonList(CommonConstant.CONSUMER_COMMIT_TOPIC));
            while (true){
                // 间隔多长时间去拉取一次数据
                ConsumerRecords<String,String> records = consumer.poll(500);
                for (ConsumerRecord<String,String> record:records){
                    System.out.println(String.format(
                            "主题：%s，分区：%d，偏移量：%d，key：%s，value：%s",
                            record.topic(),record.partition(),record.offset(),
                            record.key(),record.value()));
                    currOffsets.put(new TopicPartition(record.topic(),record.partition()),
                            new OffsetAndMetadata(record.offset()+1,"no meta"));
                    if(count%11==0){
                        consumer.commitAsync(currOffsets,null);
                    }
                    count++;
                }
            }
        }finally {
            consumer.close();
        }


    }
}
